package org.sraosha.framework.manager;

import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;

import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.List;

/**
 * rocketmq管理器类
 */
public class RocketMqManager {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 普通文本消息，MQ同步发送String类型的消息
     *
     * @param topic
     * @param message
     * @return
     */
    public SendResult sendMessage(String topic, String message) {
        return rocketMQTemplate.syncSend(topic, message);
    }

    /**
     * 发送对象消息，MQ同步发送对象类型的消息
     *
     * @return
     */
    public SendResult sendObjectMessage(String topic, Object message) {
        return rocketMQTemplate.syncSend(topic, message);
    }

    /**
     * 发送顺序消息
     *
     * @return
     */
    public SendResult sendMessageOrderly(String topic, List list) {
        SendResult sendResult = null;
        Random random = new Random();
        for (int i = 0; i < list.size(); i++) {
            sendResult = rocketMQTemplate.syncSendOrderly(topic, list.get(i), random.toString());
//            sendResult = rocketMQTemplate.syncSend(topic, list.get(i));// 如果使用这行代码发送消息的话，则消息是无序的
        }
        return sendResult;
    }

    /**
     * 异步发送:
     * 指发送方发出数据后，不等接收方发回响应，接着发送下个数据包的通讯方式。
     * MQ的异步发送，需要用户实现异步发送回调接口（SendCallback），在执行消息的异步发送时，
     * 应用不需要等待服务器响应即可直接返回，通过回调接口接收务器响应，并对服务器的响应结果进行处理。
     */
    public void asyncSendMQMessageSend(String topic, String message) {
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
            }

            @Override
            public void onException(Throwable throwable) {
            }
        });
    }

    /**
     * 异步发送:
     * 指发送方发出数据后，不等接收方发回响应，接着发送下个数据包的通讯方式。
     * MQ的异步发送，需要用户实现异步发送回调接口（SendCallback），在执行消息的异步发送时，
     * 应用不需要等待服务器响应即可直接返回，通过回调接口接收务器响应，并对服务器的响应结果进行处理。
     */
    public void asyncSendMQMessageSend(String topic, Object message) {
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
            }

            @Override
            public void onException(Throwable throwable) {
            }
        });
    }

    /**
     * 发送不关心结果的消息
     */
    public void oneWaySendMQMessageSend(String topic, String message) {
        rocketMQTemplate.sendOneWay(topic, message);
    }

    /**
     * 发送不关心结果的消息
     */
    public void oneWaySendMQMessageSend(String topic, Object message) {
        rocketMQTemplate.sendOneWay(topic, message);
    }

    /**
     * 发送延迟消息
     *
     * @param topic
     * @param message
     * @return
     */
    public SendResult delayedSendMQMessageSend(String topic, Object message, int delayLevel) {
        Message<Object> objectMessage = MessageBuilder.withPayload(message).build();
        return rocketMQTemplate.syncSend(topic, objectMessage, 1000, delayLevel);
    }

    /**
     * 发送延迟消息
     *
     * @param topic
     * @param message
     * @return
     */
    public SendResult delayedSendMQMessageSend(String topic, Object message) {
        return delayedSendMQMessageSend(topic, message, 5);
    }

    /**
     * 发送事务消息
     *
     * @return
     */
    public TransactionSendResult transactionMQSend(String topic, Object message) {
        //同步阻塞
        CountDownLatch latch = new CountDownLatch(1);

        String transactionId = UUID.randomUUID().toString();
        Message<Object> objectMessage = MessageBuilder
                .withPayload(message)
                .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                .setHeader(RocketMQHeaders.KEYS, message.hashCode())
                .build();
        TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, objectMessage, latch);

        if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)
                && sendResult.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
        }
        try {
            latch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return sendResult;
    }
}
